package test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.gfscold.trans.common.app.BaseApp;
import com.gfscold.trans.common.constant.CdcType;
import com.gfscold.trans.common.constant.RecordDeleteLog;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.json.JSONObject;

public class FlinkJavaTest01 extends BaseApp {
    public static void main(String[] args) {
        new FlinkJavaTest01().start(9999);
    }
    @Override
    public void handle(StreamExecutionEnvironment env) {
        //构建Source ,按照hostname进行区分
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("10.33.146.51")
                .port(5189)
                .databaseList("dwtest")
                .tableList("dwtest.*")
                .username("zhhuang4")
                .password("Gfs.2024!")
                .serverTimeZone("Asia/Shanghai")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> stream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "testDel");
        SingleOutputStreamOperator<CdcType> mapStream = stream.map(new MapFunction<String, CdcType>() {
            @Override
            public CdcType map(String s) throws Exception {
                ObjectMapper mapper = new ObjectMapper();
                CdcType cdc = mapper.readValue(s, CdcType.class);
                return cdc;
            }
        });

        SingleOutputStreamOperator<RecordDeleteLog> recordDeleteLogSteam = mapStream.flatMap(new FlatMapFunction<CdcType, RecordDeleteLog>() {
            @Override
            public void flatMap(CdcType cdcType, Collector<RecordDeleteLog> collector) throws Exception {
                if (cdcType.getOp().equals("d")) {
                    RecordDeleteLog recordDeleteLog = new RecordDeleteLog();
                    recordDeleteLog.setDeleteTime(cdcType.getTs_ms());
                    recordDeleteLog.setSchemaName(new JSONObject(cdcType.getSource()).getString("db"));
                    recordDeleteLog.setTableName(new JSONObject(cdcType.getSource()).getString("table"));
                    recordDeleteLog.setRecordId(new JSONObject(cdcType.getBefore()).getLong("id"));
                    recordDeleteLog.setRecordValue(cdcType.getBefore().toString());
                    collector.collect(recordDeleteLog);
                }
            }
        });
        recordDeleteLogSteam.print();


    }



}
